{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# SageMaker PySpark XGBoost MNIST Example\n",
    "\n",
    "1. [Introduction](#Introduction)\n",
    "2. [Setup](#Setup)\n",
    "3. [Loading the Data](#Loading-the-Data)\n",
    "4. [Training and Hosting a Model](#Training-and-Hosting-a-Model)\n",
    "5. [Inference](#Inference)\n",
    "6. [More on SageMaker Spark](#More-on-SageMaker-Spark)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Introduction\n",
    "This notebook will show how to classify handwritten digits using the XGBoost algorithm on Amazon SageMaker through the SageMaker PySpark library. We will train on Amazon SageMaker using XGBoost on the MNIST dataset, host the trained model on Amazon SageMaker, and then make predictions against that hosted model.\n",
    "\n",
    "Unlike the other notebooks that demonstrate XGBoost on Amazon SageMaker, this notebook uses a SparkSession to manipulate data, and uses the SageMaker Spark library to interact with SageMaker with Spark Estimators and Transformers.\n",
    "\n",
    "You can visit SageMaker Spark's GitHub repository at https://github.com/aws/sagemaker-spark to learn more about SageMaker Spark.\n",
    "\n",
    "You can visit XGBoost's GitHub repository at https://github.com/dmlc/xgboost to learn more about XGBoost\n",
    "\n",
    "This notebook was created and tested on an ml.m4.xlarge notebook instance."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup\n",
    "\n",
    "First, we import the necessary modules and create the SparkSession with the SageMaker Spark dependencies."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "import sagemaker\n",
    "from sagemaker import get_execution_role\n",
    "import sagemaker_pyspark\n",
    "\n",
    "role = get_execution_role()\n",
    "\n",
    "# Configure Spark to use the SageMaker Spark dependency jars\n",
    "jars = sagemaker_pyspark.classpath_jars()\n",
    "\n",
    "classpath = \":\".join(sagemaker_pyspark.classpath_jars())\n",
    "\n",
    "# See the SageMaker Spark Github repo under sagemaker-pyspark-sdk\n",
    "# to learn how to connect to a remote EMR cluster running Spark from a Notebook Instance.\n",
    "spark = SparkSession.builder.config(\"spark.driver.extraClassPath\", classpath)\\\n",
    "    .master(\"local[*]\").getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Loading the Data\n",
    "\n",
    "Now, we load the MNIST dataset into a Spark Dataframe, which dataset is available in LibSVM format at\n",
    "\n",
    "`s3://sagemaker-sample-data-[region]/spark/mnist/train/`\n",
    "\n",
    "where `[region]` is replaced with a supported AWS region, such as us-east-1.\n",
    "\n",
    "In order to train and make inferences our input DataFrame must have a column of Doubles (named \"label\" by default) and a column of Vectors of Doubles (named \"features\" by default).\n",
    "\n",
    "Spark's LibSVM DataFrameReader loads a DataFrame already suitable for training and inference.\n",
    "\n",
    "Here, we load into a DataFrame in the SparkSession running on the local Notebook Instance, but you can connect your Notebook Instance to a remote Spark cluster for heavier workloads. Starting from EMR 5.11.0, SageMaker Spark is pre-installed on EMR Spark clusters. For more on connecting your SageMaker Notebook Instance to a remote EMR cluster, please see [this blog post](https://aws.amazon.com/blogs/machine-learning/build-amazon-sagemaker-notebooks-backed-by-spark-in-amazon-emr/)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "\n",
    "region = boto3.Session().region_name\n",
    "spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3.{}.amazonaws.com'.format(region))\n",
    "\n",
    "trainingData = spark.read.format('libsvm')\\\n",
    "    .option('numFeatures', '784')\\\n",
    "    .option('vectorType', 'dense')\\\n",
    "    .load('s3a://sagemaker-sample-data-{}/spark/mnist/train/'.format(region))\n",
    "\n",
    "testData = spark.read.format('libsvm')\\\n",
    "    .option('numFeatures', '784')\\\n",
    "    .option('vectorType', 'dense')\\\n",
    "    .load('s3a://sagemaker-sample-data-{}/spark/mnist/test/'.format(region))\n",
    "\n",
    "trainingData.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Training and Hosting a Model\n",
    "Now we create an XGBoostSageMakerEstimator, which uses the XGBoost Amazon SageMaker Algorithm to train on our input data, and uses the XGBoost Amazon SageMaker model image to host our model.\n",
    "\n",
    "Calling fit() on this estimator will train our model on Amazon SageMaker, and then create an Amazon SageMaker Endpoint to host our model.\n",
    "\n",
    "We can then use the SageMakerModel returned by this call to fit() to transform Dataframes using our hosted model.\n",
    "\n",
    "The following cell runs a training job and creates an endpoint to host the resulting model, so this cell can take up to twenty minutes to complete."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import random\n",
    "from sagemaker_pyspark import IAMRole, S3DataPath\n",
    "from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator\n",
    "\n",
    "xgboost_estimator = XGBoostSageMakerEstimator(\n",
    "    sagemakerRole=IAMRole(role),\n",
    "    trainingInstanceType='ml.m4.xlarge',\n",
    "    trainingInstanceCount=1,\n",
    "    endpointInstanceType='ml.m4.xlarge',\n",
    "    endpointInitialInstanceCount=1)\n",
    "\n",
    "xgboost_estimator.setEta(0.2)\n",
    "xgboost_estimator.setGamma(4)\n",
    "xgboost_estimator.setMinChildWeight(6)\n",
    "xgboost_estimator.setSilent(0)\n",
    "xgboost_estimator.setObjective(\"multi:softmax\")\n",
    "xgboost_estimator.setNumClasses(10)\n",
    "xgboost_estimator.setNumRound(10)\n",
    "\n",
    "# train\n",
    "model = xgboost_estimator.fit(trainingData)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Inference\n",
    "Now we transform our DataFrame.\n",
    "To do this, we serialize each row's \"features\" Vector of Doubles into LibSVM format for inference against the Amazon SageMaker Endpoint. We deserialize the CSV responses from the XGBoost model back into our DataFrame. This serialization and deserialization is handled automatically by the `transform()` method:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "transformedData = model.transform(testData)\n",
    "\n",
    "transformedData.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "How well did the algorithm perform? Let us display the digits corresponding to each of the labels and manually inspect the results:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import DoubleType\n",
    "import matplotlib.pyplot as plt\n",
    "import numpy as np\n",
    "\n",
    "# helper function to display a digit\n",
    "def show_digit(img, caption='', xlabel='', subplot=None):\n",
    "    if subplot==None:\n",
    "        _,(subplot)=plt.subplots(1,1)\n",
    "    imgr=img.reshape((28,28))\n",
    "    subplot.axes.get_xaxis().set_ticks([])\n",
    "    subplot.axes.get_yaxis().set_ticks([])\n",
    "    plt.title(caption)\n",
    "    plt.xlabel(xlabel)\n",
    "    subplot.imshow(imgr, cmap='gray')\n",
    "\n",
    "images = np.array(transformedData.select(\"features\").cache().take(250))\n",
    "clusters = transformedData.select(\"prediction\").cache().take(250)\n",
    "\n",
    "for cluster in range(10):\n",
    "    print('\\n\\n\\nCluster {}:'.format(int(cluster)))\n",
    "    digits=[ img for l, img in zip(clusters, images) if int(l.prediction) == cluster ]\n",
    "    height=((len(digits) - 1) // 5) + 1\n",
    "    width=5\n",
    "    plt.rcParams[\"figure.figsize\"] = (width,height)\n",
    "    _, subplots = plt.subplots(height, width)\n",
    "    subplots=np.ndarray.flatten(subplots)\n",
    "    for subplot, image in zip(subplots, digits):\n",
    "        show_digit(image, subplot=subplot)\n",
    "    for subplot in subplots[len(digits):]:\n",
    "        subplot.axis('off')\n",
    "\n",
    "    plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Since we don't need to make any more inferences, now we delete the endpoint:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Delete the endpoint\n",
    "\n",
    "from sagemaker_pyspark import SageMakerResourceCleanup\n",
    "\n",
    "resource_cleanup = SageMakerResourceCleanup(model.sagemakerClient)\n",
    "resource_cleanup.deleteResources(model.getCreatedResources())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## More on SageMaker Spark\n",
    "\n",
    "The SageMaker Spark Github repository has more about SageMaker Spark, including how to use SageMaker Spark with your own algorithms on Amazon SageMaker: https://github.com/aws/sagemaker-spark\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.2"
  },
  "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.  Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
